In [27]:
import numpy as np
import pandas as pd
import soundfile as sf
import scipy.spatial
import matplotlib.pyplot as plt
import IPython.display as ipd
from audio2numpy import open_audio
from numba import jit
import librosa, librosa.display
import IPython
%matplotlib inline

Fs = 44100
X_mp3, Fs = librosa.load('./Cruel Angels Thesis From MIDI.mp3', sr=Fs)
Y_mp3, Fs = librosa.load('./Cruel Angels Thesis From YouTube.mp3', sr=Fs)

N = 4410
H = 2205

X_stft = np.abs(librosa.stft(X_mp3))
Y_stft = np.abs(librosa.stft(Y_mp3))
X_chroma = librosa.feature.chroma_stft(y=X_mp3, sr=Fs)
Y_chroma = librosa.feature.chroma_stft(y=Y_mp3, sr=Fs)

plt.figure(figsize=(8, 2))
plt.title('Sequence $X$')
librosa.display.specshow(X_chroma, x_axis='frames', y_axis='chroma', cmap='inferno', hop_length=H)
plt.xlabel('Time (frames)')
plt.ylabel('Chroma')
plt.colorbar()
plt.clim([0, 1])
plt.tight_layout(); plt.show()

plt.figure(figsize=(8, 2))
plt.title('Sequence $Y$')
librosa.display.specshow(Y_chroma, x_axis='frames', y_axis='chroma', cmap='inferno', hop_length=H)
plt.xlabel('Time (frames)')
plt.ylabel('Chroma')
plt.colorbar()
plt.clim([0, 1])
plt.tight_layout(); plt.show()
In [28]:
print(X_chroma.shape)
print(Y_chroma.shape)
(12, 21130)
(12, 21111)
In [29]:
def ensure_same_shape(array1, array2):
    array1_rows = array1.shape[1]
    array2_rows = array2.shape[1]
    
    difference = array1_rows - array2_rows
    
    if(difference > 0):
        array2_padded = np.pad(array2, [(0, 0), (0, difference)], mode='constant')  
        return(array1, array2_padded)
    elif(difference < 0):
        array1_padded = np.pad(array1, [(0, 0), (0, abs(difference))], mode='constant')
        return(array1_padded, array2)
    else:
        return(array1, array2)


X_chroma_standardized, Y_chroma_standardized = ensure_same_shape(X_chroma, Y_chroma)

print(X_chroma_standardized.shape)
print(Y_chroma_standardized.shape)
(12, 21130)
(12, 21130)
In [30]:
def compute_cost_matrix(X, Y, metric='euclidean'):
    """Compute the cost matrix of two feature sequences

    Notebook: C3/C3S2_DTWbasic.ipynb

    Args:
        X (np.ndarray): Sequence 1
        Y (np.ndarray): Sequence 2
        metric (str): Cost metric, a valid strings for scipy.spatial.distance.cdist (Default value = 'euclidean')

    Returns:
        C (np.ndarray): Cost matrix
    """
    X, Y = np.atleast_2d(X_chroma_standardized, Y_chroma_standardized)
    C = scipy.spatial.distance.cdist(X.T, Y.T, metric=metric)
    return C

C = compute_cost_matrix(X_mp3, Y_mp3, metric='euclidean')
print('Cost matrix C =', C, sep='\n')
Cost matrix C =
[[1.46373697 1.46373697 1.46373697 ... 1.46373697 1.46373697 1.46373697]
 [1.60108684 1.60108684 1.60108684 ... 1.60108684 1.60108684 1.60108684]
 [2.29136799 2.29136799 2.29136799 ... 2.29136799 2.29136799 2.29136799]
 ...
 [2.53758825 2.53758825 2.53758825 ... 2.53758825 2.53758825 2.53758825]
 [2.79364288 2.79364288 2.79364288 ... 2.79364288 2.79364288 2.79364288]
 [2.86072472 2.86072472 2.86072472 ... 2.86072472 2.86072472 2.86072472]]
In [31]:
@jit(nopython=True)
def compute_accumulated_cost_matrix(C):
    """Compute the accumulated cost matrix given the cost matrix

    Notebook: C3/C3S2_DTWbasic.ipynb

    Args:
        C (np.ndarray): Cost matrix

    Returns:
        D (np.ndarray): Accumulated cost matrix
    """
    N = C.shape[0]
    M = C.shape[1]
    D = np.zeros((N, M))
    D[0, 0] = C[0, 0]
    for n in range(1, N):
        D[n, 0] = D[n-1, 0] + C[n, 0]
    for m in range(1, M):
        D[0, m] = D[0, m-1] + C[0, m]
    for n in range(1, N):
        for m in range(1, M):
            D[n, m] = C[n, m] + min(D[n-1, m], D[n, m-1], D[n-1, m-1])
    return D

D =  compute_accumulated_cost_matrix(C)
print('Accumulated cost matrix D =', D, sep='\n')
print('DTW distance DTW(X, Y) =', D[-1, -1])
Accumulated cost matrix D =
[[1.46373697e+00 2.92747394e+00 4.39121091e+00 ... 2.57120403e+04
  2.57135040e+04 2.57149678e+04]
 [3.06482381e+00 3.06482381e+00 4.52856078e+00 ... 2.47154425e+04
  2.47170436e+04 2.47186447e+04]
 [5.35619179e+00 5.35619179e+00 5.35619179e+00 ... 2.47161328e+04
  2.47177339e+04 2.47193350e+04]
 ...
 [3.19114298e+04 3.19114298e+04 3.19114298e+04 ... 2.11464065e+04
  2.11477842e+04 2.11491620e+04]
 [3.19142234e+04 3.19142234e+04 3.19142234e+04 ... 2.11478224e+04
  2.11492002e+04 2.11505779e+04]
 [3.19170842e+04 3.19170842e+04 3.19170842e+04 ... 2.11493054e+04
  2.11506832e+04 2.11520609e+04]]
DTW distance DTW(X, Y) = 21152.0608776131
In [32]:
@jit(nopython=True)
def compute_optimal_warping_path(D):
    """Compute the warping path given an accumulated cost matrix

    Notebook: C3/C3S2_DTWbasic.ipynb

    Args:
        D (np.ndarray): Accumulated cost matrix

    Returns:
        P (np.ndarray): Optimal warping path
    """
    N = D.shape[0]
    M = D.shape[1]
    n = N - 1
    m = M - 1
    P = [(n, m)]
    while n > 0 or m > 0:
        if n == 0:
            cell = (0, m - 1)
        elif m == 0:
            cell = (n - 1, 0)
        else:
            val = min(D[n-1, m-1], D[n-1, m], D[n, m-1])
            if val == D[n-1, m-1]:
                cell = (n-1, m-1)
            elif val == D[n-1, m]:
                cell = (n-1, m)
            else:
                cell = (n, m-1)
        P.append(cell)
        (n, m) = cell
    P.reverse()
    return np.array(P)
        
P = compute_optimal_warping_path(D)
#print('Optimal warping path P =', P.tolist())
In [33]:
c_P = sum(C[n, m] for (n, m) in P)
print('Total cost of optimal warping path:', c_P)
print('DTW distance DTW(X, Y) =', D[-1, -1])
Total cost of optimal warping path: 21152.0608776131
DTW distance DTW(X, Y) = 21152.0608776131
In [34]:
P = np.array(P)
plt.figure(figsize=(18, 6))
plt.subplot(1, 2, 1)
plt.imshow(C, cmap='inferno', origin='lower', aspect='equal')
plt.plot(P[:, 1], P[:, 0], marker='o', color='r')
plt.clim([0, np.max(C)])
plt.colorbar()
plt.title('$C$ with optimal warping path')
plt.xlabel('Sequence Y')
plt.ylabel('Sequence X')

plt.subplot(1, 2, 2)
plt.imshow(D, cmap='inferno', origin='lower', aspect='equal')
plt.plot(P[:, 1], P[:, 0], marker='o', color='r')
plt.clim([0, np.max(D)])
plt.colorbar()
plt.title('$D$ with optimal warping path')
plt.xlabel('Sequence Y')
plt.ylabel('Sequence X')

plt.tight_layout()
In [35]:
X_aligned = np.zeros_like(X_stft)
Y_aligned = np.zeros_like(Y_stft)

X_aligned, Y_aligned = ensure_same_shape(X_aligned, Y_aligned)
X_stft, Y_stft = ensure_same_shape(X_stft, Y_stft)

curr_i1 = -1
curr_i2 = -1

for i, (i1, i2) in enumerate(P):
    if(i1 > curr_i1):
        X_aligned[:, i1] = X_stft[:, i1]
        curr_i1 = i1
    
    if(i2 > curr_i2):
        Y_aligned[:, i1] = Y_stft[:, i2]
        curr_i2 = i2

N = 4410
H = 2205
    
X_aligned = librosa.istft(X_aligned)
Y_aligned = librosa.istft(Y_aligned)

aligned_audio = np.column_stack((X_aligned, Y_aligned))

sf.write('aligned_audio.mp3', aligned_audio, samplerate=44100, format='mp3')

IPython.display.Audio("aligned_audio.mp3")
Out[35]:
Your browser does not support the audio element.
In [ ]: